- 
                Notifications
    You must be signed in to change notification settings 
- Fork 0
Clone kafka 18894 #32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| WalkthroughThe change replaces direct ConfigProvider usage with Plugin-wrapped providers across configuration resolution paths in clients and Connect. Constructors, fields, and cleanup paths now operate on Map<String, Plugin>. Tests are updated accordingly, a monitorable test provider is added, and minor formatting and Javadoc updates are included. Changes
 Sequence Diagram(s)sequenceDiagram
  autonumber
  actor Caller as Config user (Worker/MirrorMaker/AbstractConfig)
  participant Plugins as Plugins.newConfigProvider
  participant Plugin as Plugin<ConfigProvider>
  participant Transformer as ConfigTransformer
  participant Prov as ConfigProvider
  participant Metrics as PluginMetrics
  Caller->>Plugins: newConfigProvider(name, classLoaderUsage, metrics)
  Plugins->>Prov: instantiate & configure (prefix derived)
  Plugins->>Metrics: register provider metrics (if Monitorable)
  Plugins->>Plugin: wrapInstance(provider, metrics, prefix)
  Plugins-->>Caller: Plugin<ConfigProvider>
  Caller->>Transformer: new(..., Map<name, Plugin>)
  Note over Transformer: Store providerPlugins
  Caller->>Transformer: transform(props)
  Transformer->>Plugin: get provider by name
  alt provider exists
    Plugin->>Prov: get()
    Prov-->>Transformer: get(path, keys) -> ConfigData
    Transformer-->>Caller: transformed props + ttl
  else provider missing
    Transformer-->>Caller: unchanged props
  end
  Caller->>Plugin: close()
  Note right of Plugin: Closed via Utils.closeQuietly with plugin-specific message
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
 Pre-merge checks and finishing touches❌ Failed checks (2 warnings, 1 inconclusive)
 ✨ Finishing touches
 🧪 Generate unit tests
 Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.39.5)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.javaThanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment  | 
| The user @DDShantanuBadmanji does not have an active seat allocated for code reviews. Please contact your organization admin to assign a seat or upgrade your subscription. | 
| Summary of ChangesHello @DDShantanuBadmanji, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant enhancement to how ConfigProvider instances are managed and monitored within Kafka. The primary goal is to enable better observability by allowing ConfigProvider implementations to expose metrics through a new Plugin wrapper. This involves updates to core configuration and runtime components to ensure that ConfigProvider instances are consistently wrapped and their metrics are properly registered and tagged. The changes also include new test cases to validate this enhanced monitoring functionality. Highlights
 Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either  
 Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a  Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces metrics support for ConfigProvider implementations. The core change is wrapping ConfigProvider instances in a Plugin<T> object, which handles metrics registration for providers that implement the Monitorable interface.
The changes are well-executed and consistently applied throughout the codebase:
- The Plugins.newConfigProvidermethod has been refactored to return aPlugin<ConfigProvider>and to correctly wire up metrics when available.
- All call sites and consumers of ConfigProviderhave been updated to work with the newPluginwrapper, includingAbstractConfig,ConfigTransformer,Worker, andMirrorMakerConfig.
- The lifecycle of the ConfigProviderand its associated metrics is correctly managed through thePluginwrapper'sAutoCloseableimplementation.
- Unit tests have been updated to reflect these changes, and new tests have been added to verify the metrics functionality and the plugin lifecycle, which is great.
- The use of Map.ofin tests is a nice modernization touch.
Overall, this is a solid contribution that enhances the observability of config providers. The code is clean, and the changes are logical and well-tested. I have no further suggestions for improvement. Great work!
| The user @DDShantanuBadmanji does not have an active seat allocated for code reviews. Please contact your organization admin to assign a seat or upgrade your subscription. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
- Vagrantfile(1 hunks)
- clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java(4 hunks)
- clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java(3 hunks)
- clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java(1 hunks)
- clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java(5 hunks)
- clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java(1 hunks)
- connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java(2 hunks)
- connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java(1 hunks)
- connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java(3 hunks)
- connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java(3 hunks)
- connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java(5 hunks)
- connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java(5 hunks)
- connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
withPluginMetrics(6299-6301)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
Utils(93-1700)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (2)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
MonitorableConfigProvider(29-58)clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java (1)
WorkerConfig(58-463)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
MonitorableConfigProvider(29-58)clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
Metrics(70-696)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
Plugins(59-696)
🔇 Additional comments (24)
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1)
32-35: LGTM! Clear documentation of the Monitorable integration.The Javadoc update accurately documents the new capability for ConfigProvider implementations to register metrics via the Monitorable interface, and clearly lists the automatic tags that will be added.
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (3)
20-20: LGTM! Import added for Plugin wrapper.The new import supports the plugin-wrapped ConfigProvider pattern introduced in this PR.
49-49: LGTM! Test setup updated to use Plugin-wrapped provider.The test now correctly wraps the TestConfigProvider with
Plugin.wrapInstance, aligning with the broader PR changes that adopt plugin-based ConfigProvider management across the codebase.
54-105: LGTM! Test inputs modernized to use Map.of.The tests have been updated to use
Map.of(...)instead ofCollections.singletonMap(...)for constructing input maps. This is a clean modernization that improves readability while maintaining the same test behavior.connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1)
212-222: LGTM! Provider initialization updated to plugin-based pattern.The changes correctly update the provider initialization flow to use
Plugin<ConfigProvider>:
- The provider map type is now
Map<String, Plugin<ConfigProvider>>
newConfigProvidernow returns a plugin-wrapped provider and accepts metrics for instrumentation- The plugin map is correctly passed to
WorkerConfigTransformerThis aligns with the broader PR effort to adopt plugin-based lifecycle management and enable metrics registration for Monitorable ConfigProvider implementations.
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (3)
23-23: LGTM! Import added for Plugin wrapper.The import supports the plugin-wrapped ConfigProvider pattern.
46-52: LGTM! Constructor and field updated to use Plugin-wrapped providers.The changes correctly update the class to:
- Store
Map<String, Plugin<ConfigProvider>>instead of raw providers- Accept plugin-wrapped providers in the constructor
- Pass the plugin map to
ConfigTransformerThis aligns with the broader PR effort to adopt plugin-based lifecycle management for ConfigProvider instances.
101-101: LGTM! Cleanup updated to close plugin-wrapped providers.The close method now correctly closes plugin-wrapped providers using
Utils.closeQuietly. This ensures both the ConfigProvider instance and any associated plugin metrics are properly cleaned up.connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (4)
25-28: LGTM! Imports added for monitorable provider testing.The new imports support testing of the Monitorable interface integration with ConfigProvider plugins.
381-402: LGTM! Test updated to use plugin-based ConfigProvider API.The test correctly updates to the new API where:
newConfigProviderreturnsPlugin<ConfigProvider>instead of rawConfigProvider- The provider instance is accessed via
plugin.get()- Null metrics are passed since this test doesn't verify metrics behavior
404-412: LGTM! New test verifies correct initialization order for Monitorable providers.The test verifies that when a ConfigProvider implements Monitorable and metrics are provided:
- The provider is properly instantiated
- The
withPluginMetricsmethod is called afterconfigure()(verified by the assertion in CustomMonitorableConfigProvider)This ensures the plugin lifecycle is correct for metric-enabled providers.
810-816: LGTM! Test helper verifies initialization order.The CustomMonitorableConfigProvider test class correctly asserts that
configuredis true whenwithPluginMetricsis called, ensuring that configuration happens before metric registration. This validates the correct initialization order in the plugin lifecycle.connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (3)
69-69: LGTM! Plugin wrapping aligns with the broader refactoring.The test correctly wraps the TestConfigProvider with
Plugin.wrapInstance, passingnullfor metrics which is appropriate for this unit test context. The extra tags map correctly identifies the provider as "test".
75-75: LGTM! Cleaner test data construction.The conversion from
Collections.singletonMap(...)toMap.of(...)improves readability and is consistent with modern Java practices.Also applies to: 100-100, 115-115, 122-122
150-157: LGTM! TestConfigProvider updated consistently.The test provider's ConfigData creation now uses
Map.of(...)instead of Collections methods, maintaining consistency with the rest of the test file changes.connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (3)
379-390: LGTM! Mock provider correctly wrapped with Plugin.The
mockFileConfigProvidermethod now returns aPlugin<ConfigProvider>instead of a rawConfigProvider, correctly wrapping the instance with extra tags to identify the file provider. The null metrics argument is appropriate for this mock context.
2904-2927: LGTM! Well-structured test for monitorable config providers.The test correctly:
- Configures multiple config providers ("monitorable" and "monitorable2")
- Mocks the plugins to return Plugin-wrapped MonitorableConfigProvider instances with metrics
- Verifies that metrics are registered with the expected tags for each provider
The test effectively validates the metrics integration for monitorable config providers.
2929-2950: LGTM! Clean helper methods for metrics assertions.The
expectedTagsandassertMetricshelper methods provide clear, reusable logic for:
- Constructing expected metric tags with config, class, and extra tags
- Verifying metrics are registered in the "plugins" group with the correct tags, name, and description
The implementation correctly uses LinkedHashMap to maintain tag ordering and provides clear assertion messages.
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (2)
632-633: API change: Method now returns Plugin-wrapped ConfigProvider.The method signature changes from returning
ConfigProvidertoPlugin<ConfigProvider>and accepts a newMetricsparameter. The parameter also changes fromproviderPrefixtoproviderName, with the prefix now derived internally. This is a breaking API change that aligns with the broader refactoring to enable metrics for config providers.Ensure all callers have been updated to handle the Plugin-wrapped return type. Based on the PR context, this appears to be addressed in Worker.java and WorkerConfigTransformer.java.
649-649: LGTM! ConfigProvider correctly wrapped with metrics support.The method now returns
Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName)), which:
- Wraps the ConfigProvider instance for lifecycle management
- Passes metrics for monitoring (if the provider implements Monitorable)
- Tags the provider with "config" → "config.providers" and "provider" → providerName for metric identification
This enables the metrics integration tested in WorkerTest.testMonitorableConfigProvider.
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (4)
23-23: LGTM!The import is necessary for wrapping ConfigProvider instances with Plugin.
550-559: LGTM!The transition from
Map<String, ConfigProvider>toMap<String, Plugin<ConfigProvider>>is handled correctly. The cleanup path properly closes Plugin instances usingUtils.closeQuietly, and the ConfigTransformer construction aligns with the plugin-wrapped provider pattern mentioned in the enriched summary.
598-602: LGTM!The method signature correctly reflects the plugin-wrapped provider pattern. Since this is a private method, there's no public API impact.
629-632: Ignore null‐metrics concern: passingnullfor theMetricsparameter toPlugin.wrapInstanceinAbstractConfigis intentional and consistent with existing usage;AbstractConfigdoes not maintain aMetricsinstance.Likely an incorrect or invalid review comment.
| * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances. | ||
| */ | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adjust Javadoc to mention Plugin<ConfigProvider>.
The parameter documentation still refers to raw provider instances, but we now expect plugin-wrapped providers. Please update the wording to avoid confusion.
🤖 Prompt for AI Agents
In clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
around lines 65 to 66, update the Javadoc for the parameter to reflect that the
map contains Plugin<ConfigProvider> values rather than raw ConfigProvider
instances; change the wording to state "a Map of provider names to
Plugin<ConfigProvider> instances" (or similar) and briefly note that each entry
is a plugin-wrapped ConfigProvider to avoid confusion.
| public ConfigData get(String path) { | ||
| return null; | ||
| } | ||
|  | ||
| @Override | ||
| public ConfigData get(String path, Set<String> keys) { | ||
| return null; | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return valid ConfigData instead of null.
ConfigTransformer dereferences the result of ConfigProvider#get(...). Returning null here will trigger an NPE the moment this provider is exercised. Please return an empty ConfigData (or one populated for the test) so the transformer can proceed safely.
     @Override
     public ConfigData get(String path) {
-        return null;
+        return new ConfigData(Map.of());
     }
     @Override
     public ConfigData get(String path, Set<String> keys) {
-        return null;
+        return new ConfigData(Map.of());
     }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public ConfigData get(String path) { | |
| return null; | |
| } | |
| @Override | |
| public ConfigData get(String path, Set<String> keys) { | |
| return null; | |
| } | |
| @Override | |
| public ConfigData get(String path) { | |
| return new ConfigData(Map.of()); | |
| } | |
| @Override | |
| public ConfigData get(String path, Set<String> keys) { | |
| return new ConfigData(Map.of()); | |
| } | 
🤖 Prompt for AI Agents
In
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
around lines 41 to 48, the two get(...) methods currently return null causing a
potential NPE in ConfigTransformer; replace these null returns with valid
ConfigData instances (e.g., return new ConfigData(Collections.emptyMap(),
System.currentTimeMillis()) or a ConfigData populated with test values) so the
provider always returns a non-null ConfigData for both get(String) and
get(String, Set<String>) paths.
Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.
If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.
If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.
Summary by CodeRabbit
New Features
Documentation
Refactor
Tests
Style